-
Notifications
You must be signed in to change notification settings - Fork 125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
athena-driver #3014
athena-driver #3014
Conversation
runtime/drivers/athena/athena.go
Outdated
{ | ||
Key: "output.location", | ||
DisplayName: "Output location", | ||
Description: "Oputut location for query results in S3.", | ||
Placeholder: "bucket-name", | ||
Type: drivers.StringPropertyType, | ||
Required: true, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I would suggest
output_location
. We've used dots before, but moving away from it (supposed to be a shorthand for nested fields only) - For the placeholder – it should be something like
s3://bucket/path
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Nope, just a backet name.
runtime/drivers/athena/athena.go
Outdated
{ | ||
Key: "profile.name", | ||
DisplayName: "AWS profile", | ||
Description: "AWS profile for credentials.", | ||
Type: drivers.StringPropertyType, | ||
Required: true, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be optional, right? Also, we don't support AWS profiles in the S3 connector right now, so wondering if we should remove it (always use the default one), and contemplate it in a follow-up PR.
Also, should this be in ConfigProperties
and not SourceProperties
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, should this be in ConfigProperties and not SourceProperties?
There can be sources with different profiles (meanwhile profiles can be for the same AWS account).
runtime/drivers/athena/athena.go
Outdated
type configProperties struct { | ||
// SecretJSON string `mapstructure:"google_application_credentials"` | ||
// AllowHostAccess bool `mapstructure:"allow_host_access"` | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should support passing AWS access tokens directly here – see the S3 driver
runtime/drivers/athena/athena.go
Outdated
// DownloadFiles returns a file iterator over objects stored in gcs. | ||
// The credential json is read from config google_application_credentials. | ||
// Additionally in case `allow_host_credentials` is true it looks for "Application Default Credentials" as well | ||
func (c *Connection) DownloadFiles(ctx context.Context, source *drivers.BucketSource) (drivers.FileIterator, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect docstring
runtime/drivers/athena/athena.go
Outdated
prefix := "parquet_output_" + uuid.New().String() | ||
bucketName := strings.TrimPrefix(strings.TrimRight(conf.OutputLocation, "/"), "s3://") | ||
unloadPath := bucketName + "/" + prefix | ||
err = c.unload(ctx, conf, "s3://"+unloadPath) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to unload: %w", err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Why can't we use the output location outright? The Athena code samples don't appear to do any rewriting of the output path: https://docs.aws.amazon.com/athena/latest/ug/code-samples.html
- If we need to rewrite the output location, use
url.Parse
and associated functions to safely edit the URL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Athena Go API and
gocloud.dev/blob/s3blob
demand different S3 location format.
s3blob
needs only the bucket name:
s3bucket, err := s3blob.OpenBucketV2(context.TODO(), s3client, "athena-output-2820", nil)
and throws error InvalidBucketName: The specified bucket is not valid.
when s3://athena-output-2820
passed.
While Athena SDK complains InvalidRequestException: Invalid location athena-output-20287
if s3
prefix is not specified:
executeParams := &athena.StartQueryExecutionInput{
QueryString: aws.String("UNLOAD (SELECT * FROM cat.ptable limit 10) TO '%s' WITH (format = 'PARQUET')", "athena-output-20287"),
ResultConfiguration: resultConfig,
}
And we need to decide what the user should pass: "s3://bucket-name" or "bucket-name", or the user has the luxury to specify both and expect the application to figure it out.
Right now the implementation allows all approaches but it requires additional parameter transformations.
runtime/drivers/athena/athena.go
Outdated
r := retrier.New(retrier.LimitedExponentialBackoff(20, 100*time.Millisecond, 1*time.Second), nil) // 100 200 400 800 1000 1000 1000 1000 1000 1000 ... < 20 sec | ||
|
||
return r.Run(func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the Athena latencies, I don't think a mixed exponential/linear strategy is needed. Can just keep it simple and do a loop with a sleep (will also make it easier to reason about how cancellation gets enforced). Or maybe a loop with a select
that checks both the timer and ctx.Done()
(to support faster cancellation)
runtime/drivers/athena/athena.go
Outdated
} | ||
|
||
// Get Query execution and check for the Query state constantly every 2 second | ||
executionID := *athenaExecution.QueryExecutionId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant dereference (using &executionID
in call to GetQueryExecutionInput
runtime/drivers/athena/athena.go
Outdated
status, stateErr := client.GetQueryExecution(ctx, &athena.GetQueryExecutionInput{ | ||
QueryExecutionId: &executionID, | ||
}) | ||
|
||
if stateErr != nil { | ||
return stateErr | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can just use the normal var name err
– no apparent conflict in scope
runtime/drivers/athena/athena.go
Outdated
{ | ||
Key: "output_location", | ||
DisplayName: "S3 output location", | ||
Description: "Oputut location for query results in S3.", | ||
Placeholder: "mybucket", | ||
Type: drivers.StringPropertyType, | ||
Required: true, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Athena docs uses a s3://bucket/path
output location format, so we should support the same and use that as the placeholder
runtime/drivers/athena/athena.go
Outdated
{ | ||
Key: "region", | ||
DisplayName: "AWS region", | ||
Description: "AWS profile for credentials.", | ||
Type: drivers.StringPropertyType, | ||
Required: true, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Description doesn't match
runtime/drivers/athena/athena.go
Outdated
func (d driver) HasAnonymousSourceAccess(ctx context.Context, src drivers.Source, logger *zap.Logger) (bool, error) { | ||
return false, fmt.Errorf("not implemented") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should return false, nil
) | ||
.required("Source name is required"), | ||
output_location: yup.string().required(), | ||
region: yup.string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's marked required in the spec, but not here
case "athena": | ||
return &drivers.BucketSource{ | ||
Properties: props, | ||
}, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See note about implementing as a DatabaseSource
runtime/drivers/athena/athena.go
Outdated
cfg, err := awsconfig.LoadDefaultConfig( | ||
ctx, | ||
awsconfig.WithRegion(conf.Region), | ||
awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(c.config.AccessKeyID, c.config.SecretAccessKey, c.config.SessionToken)), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this work for environment credentials (in ~/.aws
)? See the S3 connector – the expected behavior is: use access key if provided, else fallback to environment credentials unless AllowHostAccess
is false.
runtime/drivers/athena/athena.go
Outdated
prefix := "parquet_output_" + uuid.New().String() | ||
bucketName := strings.TrimPrefix(strings.TrimRight(conf.OutputLocation, "/"), "s3://") | ||
unloadPath := bucketName + "/" + prefix | ||
err = c.unload(ctx, cfg, conf, "s3://"+unloadPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See earlier note – it should take OutputLocation
in the s3://bucket/path
format, and then we can use url.Parse
to parse it and obtain the bucket name for openBucket
.
We should also make sure it supports nested output locations, as far as I can tell, it can't be assumed the bucket is dedicated to only Athena?
runtime/drivers/athena/athena.go
Outdated
func (c *Connection) openBucket(ctx context.Context, conf *sourceProperties, bucket string) (*blob.Bucket, error) { | ||
cfg, err := awsconfig.LoadDefaultConfig( | ||
ctx, | ||
awsconfig.WithRegion(conf.Region), | ||
awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(c.config.AccessKeyID, c.config.SecretAccessKey, c.config.SessionToken)), | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
s3client := s3v2.NewFromConfig(cfg) | ||
return s3blob.OpenBucketV2(ctx, s3client, bucket, nil) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This duplicates the awsconfig
from DownloadFiles
and may have the same credentials issues
runtime/drivers/athena/athena.go
Outdated
r := retrier.New(retrier.ConstantBackoff(20, 1*time.Second), nil) | ||
|
||
return r.RunCtx(ctx, func(ctx context.Context) error { | ||
status, err := client.GetQueryExecution(ctx, &athena.GetQueryExecutionInput{ | ||
QueryExecutionId: athenaExecution.QueryExecutionId, | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
state := status.QueryExecution.Status.State | ||
|
||
if state == types.QueryExecutionStateSucceeded || state == types.QueryExecutionStateCancelled { | ||
return nil | ||
} else if state == types.QueryExecutionStateFailed { | ||
return fmt.Errorf("Athena query execution failed %s", *status.QueryExecution.Status.AthenaError.ErrorMessage) | ||
} | ||
return fmt.Errorf("Execution is not completed yet, current state: %s", state) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- This seems to translate to a 20 second query execution timeout? It's too little, it should probably continue to check until the
ctx
is cancelled. (It would also return a weird retry error message when hitting the 20s timeout.) - It seems it will continue to retry even if query execution failed?
- I don't see why this warrants a third party library dependency instead of a for loop and simple
select
that checks a timer and context cancellation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Nope, see
func (c DefaultClassifier) Classify(err error) Action {
if err == nil {
return Succeed
}
return Retry
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We've already imported that lib and it provides an opinionated approach that one doesn't need to reinvent and it presents itself right away
retrier
,ConstantBackoff
- the intention is more recognisable from those words. If I see a low level Timer then I need read more scrupulously to figure out if it's just a simple retry pattern or something else more complex.
runtime/drivers/athena/athena.go
Outdated
return err | ||
} | ||
|
||
func (c *Connection) DownloadFiles(ctx context.Context, source *drivers.BucketSource) (drivers.FileIterator, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems it doesn't call cleanPath
on success cases? It's probably easiest to wrap the file iterator with an object that calls cleanPath
on close
Another comment – please also add docs changes for the Athena connector. And @nishantmonu51 requested to let people know that having an S3 file retention rule for the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two other questions:
- It's missing the ability to configure the AWS region now?
- What region will be used if the workgroup is nil? Is it guaranteed to be the output_location one?
runtime/drivers/athena/athena.go
Outdated
{ | ||
Key: "athena_output_location", | ||
DisplayName: "S3 output location", | ||
Description: "Oputut location for query results in S3.", | ||
Placeholder: "s3://bucket-name/path/", | ||
Type: drivers.StringPropertyType, | ||
Required: true, | ||
}, | ||
{ | ||
Key: "athena_workgroup", | ||
DisplayName: "AWS Athena workgroup", | ||
Description: "AWS Athena workgroup to use for queries.", | ||
Type: drivers.StringPropertyType, | ||
Required: false, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can omit the athena_
prefix for the keys, it's implied when the code file starts with type: athena
case "athena": | ||
vars["aws_access_key_id"] = env["aws_access_key_id"] | ||
vars["aws_secret_access_key"] = env["aws_secret_access_key"] | ||
vars["aws_session_token"] = env["aws_session_token"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also needed in runtime/connections.go#connectorConfig
func sourceReader(paths []string, format string, ingestionProps map[string]any, fromAthena bool) (string, error) { | ||
// Generate a "read" statement | ||
if containsAny(format, []string{".csv", ".tsv", ".txt"}) { | ||
// CSV reader | ||
return generateReadCsvStatement(paths, ingestionProps) | ||
} else if strings.Contains(format, ".parquet") { | ||
} else if strings.Contains(format, ".parquet") || fromAthena { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change shouldn't be needed, the sqlstore_to_duckdb
transporter already sets the format to format := fileutil.FullExt(files[0])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Athena outputs parquet files without extension, and there's no configuration to change that. So the source reader doesn't have anything to detect parquet
format here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partly changed this so that there is no fromAthena
arg but .parquet
extension is added to format
runtime/drivers/athena/sql_store.go
Outdated
// ie | ||
// outputLocation s3://bucket-name/prefix | ||
// unloadLocation s3://bucket-name/prefix/rill-connector-parquet-output-<uuid> | ||
// unloadPath prefix/rill-connector-parquet-output-<uuid> | ||
unloadFolderName := "parquet_output_" + uuid.New().String() | ||
bucketName := strings.Split(strings.TrimPrefix(outputLocation, "s3://"), "/")[0] | ||
unloadLocation := strings.TrimRight(outputLocation, "/") + "/" + unloadFolderName | ||
unloadPath := strings.TrimPrefix(strings.TrimPrefix(unloadLocation, "s3://"+bucketName), "/") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Use
url.Parse
for URI manipulation - Change
parquet_output_
torill_tmp_
runtime/drivers/athena/athena.go
Outdated
} else if conf.WorkGroup != "" { | ||
wo, err := client.GetWorkGroup(ctx, &athena.GetWorkGroupInput{ | ||
WorkGroup: aws.String(conf.WorkGroup), | ||
}) | ||
if err != nil { | ||
return "", err | ||
} | ||
return *wo.WorkGroup.Configuration.ResultConfiguration.OutputLocation, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is a workgroup's output location guaranteed to be non-nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not guaranteed, but specifying a workgroup for the purpose of not specifying output location and having this workgroup without a location asks for the fail-fast approach.
runtime/drivers/athena/athena.go
Outdated
r := retrier.New(retrier.ConstantBackoff(int(5*time.Minute/time.Second), time.Second), nil) // 5 minutes timeout | ||
return r.RunCtx(ctx, func(ctx context.Context) error { | ||
status, err := client.GetQueryExecution(ctx, &athena.GetQueryExecutionInput{ | ||
QueryExecutionId: athenaExecution.QueryExecutionId, | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
state := status.QueryExecution.Status.State | ||
|
||
if state == types.QueryExecutionStateSucceeded || state == types.QueryExecutionStateCancelled { | ||
return nil | ||
} else if state == types.QueryExecutionStateFailed { | ||
return fmt.Errorf("Athena query execution failed %s", *status.QueryExecution.Status.AthenaError.ErrorMessage) | ||
} | ||
return fmt.Errorf("Athena ingestion timeout") | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't see how this will stop retrying on the query failed case. If it returns an error, it retries, right?
Also, is polling every second recommended? What's the recommended polling interval?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the recommended polling interval?
There's no recommendation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is 1 sec in code samples
runtime/drivers/athena/athena.go
Outdated
|
||
func cleanPath(ctx context.Context, cfg aws.Config, bucketName, prefix string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the functions from here and after in this file are related to sql_store.go
, so please move them there.
Also try to re-organize to follow the function ordering guidelines: https://github.com/uber-go/guide/blob/master/style.md#function-grouping-and-ordering
Yes, the region should be returned back. Right now, the region is resolved from the default aws configuration profile. |
# Conflicts: # runtime/drivers/duckdb/transporter/sqlstore_to_duckDB.go # web-common/src/features/sources/modal/AddSourceModal.svelte
cleanUp function Added AWS region and reordered functions Moved functions to sql_store Renaming and code refactoring
# Conflicts: # runtime/services/catalog/artifacts/yaml/objects.go # web-common/src/features/sources/modal/yupSchemas.ts
Returned the region, set its default value to |
|
runtime/drivers/athena/sql_store.go
Outdated
bucketObj, err := openBucket(ctx, awsConfig, bucketName) | ||
if err != nil { | ||
return nil, errors.Join(fmt.Errorf("cannot open bucket %q: %w", bucketName, err), cleanupFn()) | ||
} | ||
|
||
opts := rillblob.Options{ | ||
GlobPattern: unloadPath + "/**", | ||
} | ||
|
||
it, err := rillblob.NewIterator(ctx, bucketObj, opts, c.logger) | ||
if err != nil { | ||
return nil, errors.Join(fmt.Errorf("cannot download parquet output %q %w", opts.GlobPattern, err), cleanupFn()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it call cleanupFn
before returning in these error conditions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reduced the number of calls by moving the call into defer
runtime/drivers/athena/sql_store.go
Outdated
// outputLocation s3://bucket/path | ||
// unloadLocation s3://bucket/path/rill_tmp_<uuid> | ||
// unloadPath path/rill_tmp_<uuid> | ||
unloadFolderName := "rill_tmp_" + uuid.New().String() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Let's avoid mix underscores and dashes, always looks a bit weird. We could either use rill-tmp-
instead or do strings.Replace(uuid.New().String(), "-", "")
for the random characters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used rill-tmp-
runtime/drivers/athena/sql_store.go
Outdated
} | ||
|
||
func (c *Connection) unload(ctx context.Context, client *athena.Client, conf *sourceProperties, unloadLocation string) error { | ||
finalSQL := fmt.Sprintf("UNLOAD (%s) TO '%s' WITH (format = 'PARQUET')", conf.SQL, unloadLocation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should put a newline after the injected SQL (UNLOAD (%s\n) ...
) in case there's a comment at the end of it
runtime/drivers/athena/sql_store.go
Outdated
tm := time.NewTimer(5 * time.Minute) | ||
defer tm.Stop() | ||
for { | ||
select { | ||
case <-tm.C: | ||
return fmt.Errorf("Athena ingestion timed out") | ||
default: | ||
status, err := client.GetQueryExecution(ctx, &athena.GetQueryExecutionInput{ | ||
QueryExecutionId: queryExecutionOutput.QueryExecutionId, | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
switch status.QueryExecution.Status.State { | ||
case types2.QueryExecutionStateSucceeded, types2.QueryExecutionStateCancelled: | ||
return nil | ||
case types2.QueryExecutionStateFailed: | ||
return fmt.Errorf("Athena query execution failed %s", *status.QueryExecution.Status.AthenaError.ErrorMessage) | ||
} | ||
} | ||
time.Sleep(time.Second) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Instead of using a hard-coded timer, it can use the
ctx
(check<-ctx.Done()
), which has a timeout that's configurable through thetimeout:
YAML property. - If there's a
ctx
cancellation/timeout, it would be nice to cancel the running query. - For
case ..., types2.QueryExecutionStateCancelled
– shouldn't this case return an error? Otherwise, it will try to consume the results of the cancelled query.
runtime/drivers/athena/sql_store.go
Outdated
if out.IsTruncated { | ||
continuationToken = out.NextContinuationToken |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe also check out.NextContinuationToken != nil
just to be extra cautious that we don't loop forever?
fromAthena := reflect.TypeOf(s.from).AssignableTo(reflect.TypeOf(&athena.Connection{})) | ||
for iter.HasNext() { | ||
files, err := iter.NextBatch(_sqlStoreIteratorBatchSize) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
format := fileutil.FullExt(files[0]) | ||
if fromAthena { | ||
// Athena doesn't specify ".parquet" extension in output file names | ||
// Append ".parquet" extension to the extension generated by Athena | ||
format += ".parquet" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reflection really hurts... Is there not a way to have Athena give output files the right file extension with UNLOAD
?
If not, we should add a way to propagate the format, maybe by adding it in rillblob.Options
and exposing it as iter.Format()
or something like that.
# Conflicts: # runtime/drivers/blob/blobdownloader.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks solid!
Checklist
Summary
Issue addressed:
#3014
Details:
Adds Athena data source for data ingestion, see screenshots.
Steps to Verify